{
  "nbformat": 4,
  "nbformat_minor": 0,
  "metadata": {
    "colab": {
      "name": "Beam DataFrames",
      "provenance": [],
      "collapsed_sections": [],
      "toc_visible": true
    },
    "kernelspec": {
      "name": "python3",
      "display_name": "Python 3"
    },
    "language_info": {
      "name": "python"
    }
  },
  "cells": [
    {
      "cell_type": "code",
      "execution_count": null,
      "source": [
        "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
        "\n",
        "# Licensed to the Apache Software Foundation (ASF) under one\n",
        "# or more contributor license agreements. See the NOTICE file\n",
        "# distributed with this work for additional information\n",
        "# regarding copyright ownership. The ASF licenses this file\n",
        "# to you under the Apache License, Version 2.0 (the\n",
        "# \"License\"); you may not use this file except in compliance\n",
        "# with the License. You may obtain a copy of the License at\n",
        "#\n",
        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing,\n",
        "# software distributed under the License is distributed on an\n",
        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
        "# KIND, either express or implied. See the License for the\n",
        "# specific language governing permissions and limitations\n",
        "# under the License."
      ],
      "outputs": [],
      "metadata": {
        "cellView": "form",
        "id": "rz2qIC9IL2rI"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Beam DataFrames\n",
        "\n",
        "<button>\n",
        "  <a href=\"https://beam.apache.org/documentation/dsls/dataframes/overview/\">\n",
        "    <img src=\"https://beam.apache.org/images/favicon.ico\" alt=\"Open the docs\" height=\"16\"/>\n",
        "    Beam DataFrames overview\n",
        "  </a>\n",
        "</button>\n",
        "\n",
        "Beam DataFrames provide a pandas-like [DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)\n",
        "API to declare Beam pipelines.\n",
        "\n",
        "> ℹ️ To learn more about Beam DataFrames, take a look at the\n",
        "[Beam DataFrames overview](https://beam.apache.org/documentation/dsls/dataframes/overview) page.\n",
        "\n",
        "First, we need to install Apache Beam with the `interactive` extra for the Interactive runner.We also need to install a version of `pandas` supported by the DataFrame API, which we can get with the `dataframe` extra in Beam 2.34.0 and newer."
      ],
      "metadata": {
        "id": "hDuXLLSZnI1D"
      }
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "source": [
        "#You must restart the runtime if it's the first time executing this cell.\n",
        "%pip install apache-beam[interactive,dataframe]"
      ],
      "outputs": [],
      "metadata": {
        "id": "8QVByaWjkarZ"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "Lets create a small data file of\n",
        "[Comma-Separated Values (CSV)](https://en.wikipedia.org/wiki/Comma-separated_values).\n",
        "It simply includes the dates of the\n",
        "[equinoxes](https://en.wikipedia.org/wiki/Equinox) and\n",
        "[solstices](https://en.wikipedia.org/wiki/Solstice)\n",
        "of the year 2021."
      ],
      "metadata": {
        "id": "aLqdbX4Mgipq"
      }
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "source": [
        "%%writefile solar_events.csv\n",
        "timestamp,event\n",
        "2021-03-20 09:37:00,March Equinox\n",
        "2021-06-21 03:32:00,June Solstice\n",
        "2021-09-22 19:21:00,September Equinox\n",
        "2021-12-21 15:59:00,December Solstice"
      ],
      "outputs": [],
      "metadata": {
        "id": "hZjwAm7qotrJ"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Interactive Beam\n",
        "\n",
        "Pandas has the\n",
        "[`pandas.read_csv`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html)\n",
        "function to easily read CSV files into DataFrames.\n",
        "Beam has the\n",
        "[`beam.dataframe.io.read_csv`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.io.html#apache_beam.dataframe.io.read_csv)\n",
        "function that emulates `pandas.read_csv`, but returns a deferred Beam DataFrame.\n",
        "\n",
        "If you’re using\n",
        "[Interactive Beam](https://beam.apache.org/releases/pydoc/current/apache_beam.runners.interactive.interactive_beam.html),\n",
        "you can use `collect` to bring a Beam DataFrame into local memory as a Pandas DataFrame."
      ],
      "metadata": {
        "id": "Hv_58JulleQ_"
      }
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "source": [
        "import apache_beam as beam\n",
        "import apache_beam.runners.interactive.interactive_beam as ib\n",
        "from apache_beam.runners.interactive.interactive_runner import InteractiveRunner\n",
        "\n",
        "pipeline = beam.Pipeline(InteractiveRunner())\n",
        "\n",
        "# Create a deferred Beam DataFrame with the contents of our csv file.\n",
        "beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('solar_events.csv')\n",
        "\n",
        "# We can use `ib.collect` to view the contents of a Beam DataFrame.\n",
        "ib.collect(beam_df)"
      ],
      "outputs": [
        {
          "output_type": "display_data",
          "data": {
            "application/javascript": "\n        if (typeof window.interactive_beam_jquery == 'undefined') {\n          var jqueryScript = document.createElement('script');\n          jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n          jqueryScript.type = 'text/javascript';\n          jqueryScript.onload = function() {\n            var datatableScript = document.createElement('script');\n            datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n            datatableScript.type = 'text/javascript';\n            datatableScript.onload = function() {\n              window.interactive_beam_jquery = jQuery.noConflict(true);\n              window.interactive_beam_jquery(document).ready(function($){\n                \n              });\n            }\n            document.head.appendChild(datatableScript);\n          };\n          document.head.appendChild(jqueryScript);\n        } else {\n          window.interactive_beam_jquery(document).ready(function($){\n            \n          });\n        }"
          },
          "metadata": {}
        },
        {
          "output_type": "display_data",
          "data": {
            "text/html": [
              "\n",
              "            <link rel=\"stylesheet\" href=\"https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css\" integrity=\"sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh\" crossorigin=\"anonymous\">\n",
              "            <div id=\"progress_indicator_1516f4062e4fc6d4e58f33cf44c41c1d\" class=\"spinner-border text-info\" role=\"status\">\n",
              "            </div>"
            ],
            "text/plain": [
              "<IPython.core.display.HTML object>"
            ]
          },
          "metadata": {}
        },
        {
          "output_type": "stream",
          "name": "stderr",
          "text": [
            "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.\n"
          ]
        },
        {
          "output_type": "display_data",
          "data": {
            "application/javascript": "\n        if (typeof window.interactive_beam_jquery == 'undefined') {\n          var jqueryScript = document.createElement('script');\n          jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n          jqueryScript.type = 'text/javascript';\n          jqueryScript.onload = function() {\n            var datatableScript = document.createElement('script');\n            datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n            datatableScript.type = 'text/javascript';\n            datatableScript.onload = function() {\n              window.interactive_beam_jquery = jQuery.noConflict(true);\n              window.interactive_beam_jquery(document).ready(function($){\n                \n            $(\"#progress_indicator_1516f4062e4fc6d4e58f33cf44c41c1d\").remove();\n              });\n            }\n            document.head.appendChild(datatableScript);\n          };\n          document.head.appendChild(jqueryScript);\n        } else {\n          window.interactive_beam_jquery(document).ready(function($){\n            \n            $(\"#progress_indicator_1516f4062e4fc6d4e58f33cf44c41c1d\").remove();\n          });\n        }"
          },
          "metadata": {}
        },
        {
          "output_type": "execute_result",
          "data": {
            "text/html": [
              "<div>\n",
              "<style scoped>\n",
              "    .dataframe tbody tr th:only-of-type {\n",
              "        vertical-align: middle;\n",
              "    }\n",
              "\n",
              "    .dataframe tbody tr th {\n",
              "        vertical-align: top;\n",
              "    }\n",
              "\n",
              "    .dataframe thead th {\n",
              "        text-align: right;\n",
              "    }\n",
              "</style>\n",
              "<table border=\"1\" class=\"dataframe\">\n",
              "  <thead>\n",
              "    <tr style=\"text-align: right;\">\n",
              "      <th></th>\n",
              "      <th>timestamp</th>\n",
              "      <th>event</th>\n",
              "    </tr>\n",
              "  </thead>\n",
              "  <tbody>\n",
              "    <tr>\n",
              "      <th>solar_events.csv:0</th>\n",
              "      <td>2021-03-20 09:37:00</td>\n",
              "      <td>March Equinox</td>\n",
              "    </tr>\n",
              "    <tr>\n",
              "      <th>solar_events.csv:1</th>\n",
              "      <td>2021-06-21 03:32:00</td>\n",
              "      <td>June Solstice</td>\n",
              "    </tr>\n",
              "    <tr>\n",
              "      <th>solar_events.csv:2</th>\n",
              "      <td>2021-09-22 19:21:00</td>\n",
              "      <td>September Equinox</td>\n",
              "    </tr>\n",
              "    <tr>\n",
              "      <th>solar_events.csv:3</th>\n",
              "      <td>2021-12-21 15:59:00</td>\n",
              "      <td>December Solstice</td>\n",
              "    </tr>\n",
              "  </tbody>\n",
              "</table>\n",
              "</div>"
            ],
            "text/plain": [
              "                              timestamp              event\n",
              "solar_events.csv:0  2021-03-20 09:37:00      March Equinox\n",
              "solar_events.csv:1  2021-06-21 03:32:00      June Solstice\n",
              "solar_events.csv:2  2021-09-22 19:21:00  September Equinox\n",
              "solar_events.csv:3  2021-12-21 15:59:00  December Solstice"
            ]
          },
          "metadata": {},
          "execution_count": 3
        }
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/",
          "height": 242
        },
        "id": "sKAMXD5ElhYP",
        "outputId": "928d9ad7-ae75-42d7-8dc6-8c5afd730b11"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "Collecting a Beam DataFrame into a Pandas DataFrame is useful to perform\n",
        "[operations not supported by Beam DataFrames](https://beam.apache.org/documentation/dsls/dataframes/differences-from-pandas#classes-of-unsupported-operations).\n",
        "\n",
        "For example, let's say we want to take only the first two events in chronological order.\n",
        "Since a deferred Beam DataFrame does not have any ordering guarantees,\n",
        "first we need to sort the values.\n",
        "In Pandas, we could first\n",
        "[`df.sort_values(by='timestamp')`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.sort_values.html) and then\n",
        "[`df.head(2)`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.head.html) to achieve this.\n",
        "\n",
        "However, these are\n",
        "[order-sensitive operations](https://beam.apache.org/documentation/dsls/dataframes/differences-from-pandas#order-sensitive-operations)\n",
        "so using them in a Beam DataFrame raises a\n",
        "[`WontImplementError`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.frame_base.html#apache_beam.dataframe.frame_base.WontImplementError).\n",
        "We can work around this by using `collect` to convert the Beam DataFrame into a Pandas DataFrame."
      ],
      "metadata": {
        "id": "t3Is6dArtN_Z"
      }
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "source": [
        "import apache_beam.runners.interactive.interactive_beam as ib\n",
        "\n",
        "# Collect the Beam DataFrame into a Pandas DataFrame.\n",
        "df = ib.collect(beam_df)\n",
        "\n",
        "# We can now use any Pandas transforms with our data.\n",
        "df.sort_values(by='timestamp').head(2)"
      ],
      "outputs": [
        {
          "output_type": "display_data",
          "data": {
            "text/html": [
              "\n",
              "            <link rel=\"stylesheet\" href=\"https://stackpath.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css\" integrity=\"sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh\" crossorigin=\"anonymous\">\n",
              "            <div id=\"progress_indicator_4486e01c01f75e7a68a4a5fefa9ecd2c\" class=\"spinner-border text-info\" role=\"status\">\n",
              "            </div>"
            ],
            "text/plain": [
              "<IPython.core.display.HTML object>"
            ]
          },
          "metadata": {}
        },
        {
          "output_type": "display_data",
          "data": {
            "application/javascript": "\n        if (typeof window.interactive_beam_jquery == 'undefined') {\n          var jqueryScript = document.createElement('script');\n          jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n          jqueryScript.type = 'text/javascript';\n          jqueryScript.onload = function() {\n            var datatableScript = document.createElement('script');\n            datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n            datatableScript.type = 'text/javascript';\n            datatableScript.onload = function() {\n              window.interactive_beam_jquery = jQuery.noConflict(true);\n              window.interactive_beam_jquery(document).ready(function($){\n                \n            $(\"#progress_indicator_4486e01c01f75e7a68a4a5fefa9ecd2c\").remove();\n              });\n            }\n            document.head.appendChild(datatableScript);\n          };\n          document.head.appendChild(jqueryScript);\n        } else {\n          window.interactive_beam_jquery(document).ready(function($){\n            \n            $(\"#progress_indicator_4486e01c01f75e7a68a4a5fefa9ecd2c\").remove();\n          });\n        }"
          },
          "metadata": {}
        },
        {
          "output_type": "execute_result",
          "data": {
            "text/html": [
              "<div>\n",
              "<style scoped>\n",
              "    .dataframe tbody tr th:only-of-type {\n",
              "        vertical-align: middle;\n",
              "    }\n",
              "\n",
              "    .dataframe tbody tr th {\n",
              "        vertical-align: top;\n",
              "    }\n",
              "\n",
              "    .dataframe thead th {\n",
              "        text-align: right;\n",
              "    }\n",
              "</style>\n",
              "<table border=\"1\" class=\"dataframe\">\n",
              "  <thead>\n",
              "    <tr style=\"text-align: right;\">\n",
              "      <th></th>\n",
              "      <th>timestamp</th>\n",
              "      <th>event</th>\n",
              "    </tr>\n",
              "  </thead>\n",
              "  <tbody>\n",
              "    <tr>\n",
              "      <th>solar_events.csv:0</th>\n",
              "      <td>2021-03-20 09:37:00</td>\n",
              "      <td>March Equinox</td>\n",
              "    </tr>\n",
              "    <tr>\n",
              "      <th>solar_events.csv:1</th>\n",
              "      <td>2021-06-21 03:32:00</td>\n",
              "      <td>June Solstice</td>\n",
              "    </tr>\n",
              "  </tbody>\n",
              "</table>\n",
              "</div>"
            ],
            "text/plain": [
              "                              timestamp          event\n",
              "solar_events.csv:0  2021-03-20 09:37:00  March Equinox\n",
              "solar_events.csv:1  2021-06-21 03:32:00  June Solstice"
            ]
          },
          "metadata": {},
          "execution_count": 4
        }
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/",
          "height": 138
        },
        "id": "8haEu6_9iTi7",
        "outputId": "a1e07bdc-c66d-45e5-efff-90b93219c648"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "> ℹ️ Note that `collect` is _only_ accessible if you’re using\n",
        "[Interactive Beam](https://beam.apache.org/releases/pydoc/current/apache_beam.runners.interactive.interactive_beam.html)"
      ],
      "metadata": {
        "id": "ZkthQ13pwpm0"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Beam DataFrames to PCollections\n",
        "\n",
        "If you have your data as a Beam DataFrame, you can convert it into a regular PCollection with\n",
        "[`to_pcollection`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_pcollection).\n",
        "\n",
        "Converting a Beam DataFrame in this way yields a PCollection with a [schema](https://beam.apache.org/documentation/programming-guide/#what-is-a-schema).\n",
        "This allows us to easily access each property by attribute, for example `element.event` and `element.timestamp`.\n",
        "\n",
        "Sometimes it's more convenient to convert the named tuples to Python dictionaries.\n",
        "We can do that with the\n",
        "[`_asdict`](https://docs.python.org/3/library/collections.html#collections.somenamedtuple._asdict)\n",
        "method."
      ],
      "metadata": {
        "id": "ujRm4K0iP8SX"
      }
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "source": [
        "import apache_beam as beam\n",
        "from apache_beam.dataframe import convert\n",
        "\n",
        "with beam.Pipeline() as pipeline:\n",
        "  beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('solar_events.csv')\n",
        "\n",
        "  (\n",
        "      # Convert the Beam DataFrame to a PCollection.\n",
        "      convert.to_pcollection(beam_df)\n",
        "\n",
        "      # We get named tuples, we can convert them to dictionaries like this.\n",
        "      | 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))\n",
        "\n",
        "      # Print the elements in the PCollection.\n",
        "      | 'Print' >> beam.Map(print)\n",
        "  )"
      ],
      "outputs": [
        {
          "output_type": "stream",
          "name": "stderr",
          "text": [
            "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.\n"
          ]
        },
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n",
            "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n",
            "{'timestamp': '2021-09-22 19:21:00', 'event': 'September Equinox'}\n",
            "{'timestamp': '2021-12-21 15:59:00', 'event': 'December Solstice'}\n"
          ]
        }
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "g22op8rZPvB3",
        "outputId": "bba88b0b-4d19-4d61-dac7-2c168998a2e4"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Pandas DataFrames to PCollections\n",
        "\n",
        "If you have your data as a Pandas DataFrame, you can convert it into a regular PCollection with\n",
        "[`to_pcollection`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_pcollection).\n",
        "\n",
        "Since Pandas DataFrames are not part of any Beam pipeline, we must provide the `pipeline` explicitly."
      ],
      "metadata": {
        "id": "t6xNIO0iPwtn"
      }
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "source": [
        "import pandas as pd\n",
        "import apache_beam as beam\n",
        "from apache_beam.dataframe import convert\n",
        "\n",
        "with beam.Pipeline() as pipeline:\n",
        "  df = pd.read_csv('solar_events.csv')\n",
        "\n",
        "  (\n",
        "      # Convert the Pandas DataFrame to a PCollection.\n",
        "      convert.to_pcollection(df, pipeline=pipeline)\n",
        "\n",
        "      # We get named tuples, we can convert them to dictionaries like this.\n",
        "      | 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))\n",
        "\n",
        "      # Print the elements in the PCollection.\n",
        "      | 'Print' >> beam.Map(print)\n",
        "  )"
      ],
      "outputs": [
        {
          "output_type": "stream",
          "name": "stderr",
          "text": [
            "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.\n"
          ]
        },
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n",
            "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n",
            "{'timestamp': '2021-09-22 19:21:00', 'event': 'September Equinox'}\n",
            "{'timestamp': '2021-12-21 15:59:00', 'event': 'December Solstice'}\n"
          ]
        }
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "YWYVFkvFuksz",
        "outputId": "a3e3e6fa-85ce-4891-95a0-389fba4461a6"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "If you have your data as a PCollection of Pandas DataFrames, you can convert them into a PCollection with\n",
        "[`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap).\n",
        "\n",
        "> ℹ️ If the number of elements in each DataFrame can be very different (that is, some DataFrames might contain thousands of elements while others contain only a handful of elements), it might be a good idea to\n",
        "> [`Reshuffle`](https://beam.apache.org/documentation/transforms/python/other/reshuffle).\n",
        "> This basically rebalances the elements in the PCollection, which helps make sure all the workers have a balanced number of elements."
      ],
      "metadata": {
        "id": "z6Q_tyWszkMC"
      }
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "source": [
        "import pandas as pd\n",
        "import apache_beam as beam\n",
        "\n",
        "with beam.Pipeline() as pipeline:\n",
        "  (\n",
        "      pipeline\n",
        "      | 'Filename' >> beam.Create(['solar_events.csv'])\n",
        "\n",
        "      # Each element is a Pandas DataFrame, so we can do any Pandas operation.\n",
        "      | 'Read CSV' >> beam.Map(pd.read_csv)\n",
        "\n",
        "      # We yield each element of all the DataFrames into a PCollection of dictionaries.\n",
        "      | 'To dictionaries' >> beam.FlatMap(lambda df: df.to_dict('records'))\n",
        "\n",
        "      # Reshuffle to make sure parallelization is balanced.\n",
        "      | 'Reshuffle' >> beam.Reshuffle()\n",
        "\n",
        "      # Print the elements in the PCollection.\n",
        "      | 'Print' >> beam.Map(print)\n",
        "  )"
      ],
      "outputs": [
        {
          "output_type": "stream",
          "name": "stderr",
          "text": [
            "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.\n"
          ]
        },
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n",
            "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n",
            "{'timestamp': '2021-09-22 19:21:00', 'event': 'September Equinox'}\n",
            "{'timestamp': '2021-12-21 15:59:00', 'event': 'December Solstice'}\n"
          ]
        }
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "fVWjO2Zfziqu",
        "outputId": "c5db7be4-f764-487a-bc3b-bd5cbad4e396"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "# PCollections to Beam DataFrames\n",
        "\n",
        "If you have your data as a PCollection, you can convert it into a deferred Beam DataFrame with\n",
        "[`to_dataframe`](https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.convert.html#apache_beam.dataframe.convert.to_dataframe).\n",
        "\n",
        "> ℹ️ To convert a PCollection to a Beam DataFrame, each element _must_ have a\n",
        "[schema](https://beam.apache.org/documentation/programming-guide/#what-is-a-schema)."
      ],
      "metadata": {
        "id": "_Dm2u71EIRFr"
      }
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "source": [
        "import csv\n",
        "import apache_beam as beam\n",
        "from apache_beam.dataframe import convert\n",
        "\n",
        "with open('solar_events.csv') as f:\n",
        "  solar_events = [dict(row) for row in csv.DictReader(f)]\n",
        "\n",
        "with beam.Pipeline() as pipeline:\n",
        "  pcoll = pipeline | 'Create data' >> beam.Create(solar_events)\n",
        "\n",
        "  # Convert the PCollection into a Beam DataFrame\n",
        "  beam_df = convert.to_dataframe(pcoll | 'To Rows' >> beam.Map(\n",
        "      lambda x: beam.Row(\n",
        "          timestamp=x['timestamp'],\n",
        "          event=x['event'],\n",
        "      )\n",
        "  ))\n",
        "\n",
        "  # Print the elements in the Beam DataFrame.\n",
        "  (\n",
        "      convert.to_pcollection(beam_df)\n",
        "      | 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))\n",
        "      | 'Print' >> beam.Map(print)\n",
        "  )"
      ],
      "outputs": [
        {
          "output_type": "stream",
          "name": "stderr",
          "text": [
            "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.\n"
          ]
        },
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n",
            "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n",
            "{'timestamp': '2021-09-22 19:21:00', 'event': 'September Equinox'}\n",
            "{'timestamp': '2021-12-21 15:59:00', 'event': 'December Solstice'}\n"
          ]
        }
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "N6dVNkkEIWa_",
        "outputId": "16556170-fbf6-4980-962c-bb466d0b76b2"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "# PCollections to Pandas DataFrames\n",
        "\n",
        "If you have your data as a PCollection, you can convert it into an in-memory Pandas DataFrame via a\n",
        "[side input](https://beam.apache.org/documentation/programming-guide#side-inputs).\n",
        "\n",
        "> ℹ️ It's recommended to **only** do this if you need to use a Pandas operation that is\n",
        "> [not supported in Beam DataFrames](https://beam.apache.org/documentation/dsls/dataframes/differences-from-pandas/#classes-of-unsupported-operations).\n",
        "> Converting a PCollection into a Pandas DataFrame consolidates elements from potentially multiple workers into a single worker, which could create a performance bottleneck.\n",
        "\n",
        "> ⚠️ Pandas DataFrames are in-memory data structures, so make sure all the elements in the PCollection fit into memory.\n",
        "> If they don't fit into memory, consider yielding multiple DataFrame elements via\n",
        "> [`FlatMap`](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap)."
      ],
      "metadata": {
        "id": "kj08jOZQQa_q"
      }
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "source": [
        "import csv\n",
        "import pandas as pd\n",
        "import apache_beam as beam\n",
        "\n",
        "with open('solar_events.csv') as f:\n",
        "  solar_events = [dict(row) for row in csv.DictReader(f)]\n",
        "\n",
        "with beam.Pipeline() as pipeline:\n",
        "  pcoll = pipeline | 'Create data' >> beam.Create(solar_events)\n",
        "\n",
        "  (\n",
        "      pipeline\n",
        "\n",
        "      # Create a single element containing the entire PCollection. \n",
        "      | 'Singleton' >> beam.Create([None])\n",
        "      | 'As Pandas' >> beam.Map(\n",
        "          lambda _, dict_iter: pd.DataFrame(dict_iter),\n",
        "          dict_iter=beam.pvalue.AsIter(pcoll),\n",
        "      )\n",
        "\n",
        "      # Print the Pandas DataFrame.\n",
        "      | 'Print' >> beam.Map(print)\n",
        "  )"
      ],
      "outputs": [
        {
          "output_type": "stream",
          "name": "stderr",
          "text": [
            "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.\n"
          ]
        },
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "             timestamp              event\n",
            "0  2021-03-20 09:37:00      March Equinox\n",
            "1  2021-06-21 03:32:00      June Solstice\n",
            "2  2021-09-22 19:21:00  September Equinox\n",
            "3  2021-12-21 15:59:00  December Solstice\n"
          ]
        }
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "cHZdiPbOG-sy",
        "outputId": "11c84948-fccf-41fd-c276-7c5803264ff7"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "# What's next?\n",
        "\n",
        "* [Beam DataFrames overview](https://beam.apache.org/documentation/dsls/dataframes/overview) -- an overview of the Beam DataFrames API.\n",
        "* [Differences from pandas](https://beam.apache.org/documentation/dsls/dataframes/differences-from-pandas) -- goes through some of the differences between Beam DataFrames and Pandas DataFrames, as well as some of the workarounds for unsupported operations.\n",
        "* [10 minutes to Pandas](https://pandas.pydata.org/pandas-docs/stable/user_guide/10min.html) -- a quickstart guide to Pandas DataFrames.\n",
        "* [Pandas DataFrame API](https://pandas.pydata.org/pandas-docs/stable/reference/frame.html) -- the API reference for Pandas DataFrames\n",
        "* [Preprocessing with Beam Dataframes](https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/dataframe_api_preprocessing.ipynb) -- an example of data preprocessing for ML training using Beam DataFrames API\n"
      ],
      "metadata": {
        "id": "UflW6AJp6-ss"
      }
    }
  ]
}